feat(nodes): add S3/object store sink node via OpenDAL#273
Conversation
Add ObjectStoreWriteNode — a new sink node that streams Binary packets to S3-compatible object storage using Apache OpenDAL. Supports AWS S3, MinIO, RustFS, GCS, Azure Blob, and other S3-compatible backends. Key design: - Multipart upload via OpenDAL Writer with configurable chunk size (default 5 MiB) for bounded memory usage - Credentials resolved from env vars (precedence) or inline config, following the RTMP node's stream_key_env pattern - Full state lifecycle: Initializing → Running → Stopped/Failed - Tracing at each stage for runtime debugging - Behind opt-in 'object_store' feature flag (not in default) Includes: - 3 sample pipelines (TTS→S3, transcode→S3, MoQ+S3 archive) - docker-compose.rustfs.yml for local E2E validation with RustFS - 8 unit tests covering pins, factory validation, credential resolution, and state transitions Signed-off-by: Devin AI <devin@devin.ai> Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
- Remove double state emission (emit_failed + emit_stopped) on close failure — only emit_failed, matching every other error path in the codebase and avoiding spurious Failed→Stopped state sequence. - Add writer.abort() on close() failure to clean up orphaned multipart uploads, matching the write-error paths. Signed-off-by: Devin AI <devin@devin.ai> Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
| Err(_) => { | ||
| return Err(StreamKitError::Configuration(format!( | ||
| "Environment variable '{env}' for {label} is not set" | ||
| ))); | ||
| }, |
There was a problem hiding this comment.
🟡 resolve_credential does not fall back to literal when env var is specified but missing
The resolve_credential function's own doc comment says it resolves "from an env-var name (takes precedence) or a literal fallback", and the access_key_id field doc says "If omitted, the node falls back to access_key_id_env". However, when env_name is Some (i.e., the user provided access_key_id_env), every branch of the match std::env::var(env) returns immediately — either with Ok or Err — without ever checking the literal parameter. This means if a user provides both access_key_id: "my-key" and access_key_id_env: "MY_ENV_VAR", and MY_ENV_VAR is not set in the environment, the node hard-fails instead of falling back to the literal "my-key". The word "fallback" in the doc creates the expectation that the literal will be used when the env var is unavailable, but the code never reaches the literal check at crates/nodes/src/core/object_store_write.rs:134.
Was this helpful? React with 👍 or 👎 to provide feedback.
Debug
There was a problem hiding this comment.
This is intentional — it matches the RTMP node's stream_key_env pattern where specifying an env var name is a hard requirement: if you tell the node to read from MY_ENV_VAR, that var must exist. This prevents silent credential misconfiguration (e.g., accidentally running with a stale inline key when the env var was supposed to be set but wasn't).
The doc wording could be clearer though — "takes precedence" means "if env_name is specified, it's the authoritative source", not "try env first, fall back to literal". The access_key_id field doc ("If omitted, the node falls back to access_key_id_env") describes the reverse direction: if no literal is provided, try the env var. The two fields are independent sources, not a cascading chain.
I'll leave the behavior as-is since it's the safer default, but happy to adjust if the human reviewer prefers fallback semantics.
There was a problem hiding this comment.
This is intentional and matches the RTMP node's resolve_rtmp_url_with_env behavior — when stream_key_env is set but the env var is missing, it hard-fails rather than silently falling back to stream_key.
The rationale: if a user explicitly configures access_key_id_env: "MY_VAR", a missing MY_VAR is a deployment misconfiguration that should be surfaced immediately, not silently masked by a literal fallback. The doc comment says "takes precedence" (not "falls back") — meaning if the env path is specified, it's the authoritative source and the literal is ignored.
Updated the doc comment in the latest commit to make the resolution order clearer (env var → literal → error, where specifying an env var name commits to that path).
| while let Some(packet) = context.recv_with_cancellation(&mut input_rx).await { | ||
| if let Packet::Binary { data, .. } = packet { | ||
| stats_tracker.received(); | ||
| packet_count += 1; | ||
| total_bytes += data.len() as u64; | ||
|
|
||
| buffer.extend_from_slice(&data); | ||
|
|
||
| // Flush when buffer reaches chunk_size | ||
| while buffer.len() >= self.config.chunk_size { | ||
| let chunk: Vec<u8> = buffer.drain(..self.config.chunk_size).collect(); | ||
| if let Err(e) = writer.write(chunk).await { | ||
| stats_tracker.errored(); | ||
| stats_tracker.force_send(); | ||
| let msg = format!("S3 write error: {e}"); | ||
| state_helpers::emit_failed(&context.state_tx, &node_name, &msg); | ||
| // Attempt to abort the multipart upload to clean up | ||
| if let Err(abort_err) = writer.abort().await { | ||
| tracing::error!( | ||
| %node_name, | ||
| error = %abort_err, | ||
| "Failed to abort S3 multipart upload after write error" | ||
| ); | ||
| } | ||
| return Err(StreamKitError::Runtime(msg)); | ||
| } | ||
| chunks_written += 1; | ||
| tracing::debug!( | ||
| %node_name, | ||
| chunks_written, | ||
| total_bytes, | ||
| "Flushed chunk to S3" | ||
| ); | ||
| } | ||
|
|
||
| stats_tracker.sent(); | ||
| stats_tracker.maybe_send(); | ||
| } else { | ||
| tracing::warn!( | ||
| %node_name, | ||
| "Received non-Binary packet, ignoring" | ||
| ); | ||
| stats_tracker.discarded(); | ||
| } | ||
| } | ||
|
|
||
| // ── Flush remaining buffer ─────────────────────────────────────── | ||
| if !buffer.is_empty() { | ||
| tracing::debug!( | ||
| %node_name, | ||
| remaining = buffer.len(), | ||
| "Flushing remaining buffer to S3" | ||
| ); | ||
| if let Err(e) = writer.write(buffer).await { | ||
| stats_tracker.errored(); | ||
| stats_tracker.force_send(); | ||
| let msg = format!("S3 write error (final flush): {e}"); | ||
| state_helpers::emit_failed(&context.state_tx, &node_name, &msg); | ||
| if let Err(abort_err) = writer.abort().await { | ||
| tracing::error!( | ||
| %node_name, | ||
| error = %abort_err, | ||
| "Failed to abort S3 multipart upload after final flush error" | ||
| ); | ||
| } | ||
| return Err(StreamKitError::Runtime(msg)); | ||
| } | ||
| chunks_written += 1; | ||
| } | ||
|
|
||
| // ── Close (finalize multipart upload) ──────────────────────────── | ||
| tracing::info!( | ||
| %node_name, | ||
| "Closing S3 writer (finalizing multipart upload)" | ||
| ); | ||
| if let Err(e) = writer.close().await { | ||
| stats_tracker.errored(); | ||
| stats_tracker.force_send(); | ||
| let msg = format!("Failed to finalize S3 upload: {e}"); | ||
| state_helpers::emit_failed(&context.state_tx, &node_name, &msg); | ||
| // Attempt to abort the multipart upload to avoid orphaned parts. | ||
| if let Err(abort_err) = writer.abort().await { | ||
| tracing::error!( | ||
| %node_name, | ||
| error = %abort_err, | ||
| "Failed to abort S3 multipart upload after close error" | ||
| ); | ||
| } | ||
| return Err(StreamKitError::Runtime(msg)); | ||
| } | ||
|
|
||
| stats_tracker.force_send(); | ||
| tracing::info!( | ||
| %node_name, | ||
| packet_count, | ||
| total_bytes, | ||
| chunks_written, | ||
| key = %self.config.key, | ||
| "ObjectStoreWriteNode finished uploading to S3" | ||
| ); | ||
|
|
||
| state_helpers::emit_stopped(&context.state_tx, &node_name, reason); | ||
| Ok(()) |
There was a problem hiding this comment.
🚩 No multipart upload abort on cancellation path
When recv_with_cancellation returns None due to a cancellation token (forced pipeline shutdown), the code falls through to flush remaining data and call writer.close() (line 367), which finalizes the multipart upload. This means a cancelled pipeline will still commit whatever data was buffered to S3. In contrast, a user might expect cancellation to abort the upload. However, this mirrors the pattern in crates/nodes/src/core/file_write.rs where the file is also flushed and closed normally after the receive loop exits, regardless of whether it was cancelled or input-closed. This is consistent but worth noting as a design decision.
Was this helpful? React with 👍 or 👎 to provide feedback.
Debug
There was a problem hiding this comment.
Acknowledged — this is a deliberate design choice matching file_write.rs where the file is also flushed and closed normally after the receive loop exits regardless of cancellation. Flushing on cancellation is the safer default for a storage sink (avoids data loss of already-processed data). If abort-on-cancel semantics are desired, it could be added as a config option in a follow-up.
There was a problem hiding this comment.
Correct observation — this is an intentional design choice matching file_write.rs. When recv_with_cancellation returns None (whether from input close or cancellation token), we flush and finalize rather than abort.
The rationale: for a recording/archive use case, partial data is generally more valuable than no data. The AbortOnDrop guard handles the truly unexpected case (Tokio task killed externally without going through the normal shutdown path), while graceful shutdown always commits what we have.
If a future use case needs "cancel = discard", that could be added as a config option (e.g. abort_on_cancel: true), but the default of "commit what you have" is the safer behavior for archival workloads.
- Add AbortOnDrop guard to abort orphaned multipart uploads on task cancellation (critical: prevents storage leaks) - Replace drain().collect() with split_off + mem::replace for zero-copy chunk extraction - Validate endpoint, bucket, and key are non-empty in factory (fail-fast instead of runtime S3 errors) - Refactor resolve_credential to accept an env lookup closure, eliminating unsound std::env::set_var calls in tests - Inline reason string at use site (remove early allocation) - Add comment about static key limitation in moq_s3_archive.yml - Add tests for empty endpoint/bucket/key factory validation Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
| # SPDX-FileCopyrightText: © 2025 StreamKit Contributors | ||
| # | ||
| # SPDX-License-Identifier: MPL-2.0 | ||
|
|
||
| # RustFS — lightweight S3-compatible object storage for local development | ||
| # and integration testing. |
There was a problem hiding this comment.
Does this need to be a repo root?
There was a problem hiding this comment.
Good question — there are no other docker-compose files in the repo so there's no established convention. This could live under e2e/ or samples/ instead. Where would you prefer it? I'll move it.
There was a problem hiding this comment.
Maybe under a new docker/ path? It sounds like this could be useful besides testing even though that's the primary use case right now, but e2e is really about playwright testing atm, not backend e2e. What do you think?
There was a problem hiding this comment.
Agreed — moved to docker/docker-compose.rustfs.yml in 6b38e3b. Updated all references in the sample pipelines and the compose file's own usage comment.
Per review feedback — docker/ is a better home than repo root since this is useful beyond just e2e/playwright testing. Signed-off-by: StreamKit Devin <devin@streamkit.dev> Co-Authored-By: Claudio Costa <cstcld91@gmail.com>
Summary
Adds a new object store write sink node (
core::object_store_writer) that streamsBinarypackets to S3-compatible object storage via Apache OpenDAL.Key design choices
chunk_size(default 5 MiB) keeps memory bounded regardless of total upload size.stream_key_envpattern fromRtmpPublishNode— env var names take precedence over inline values, so secrets stay out of pipeline YAML.AbortOnDropguard ensures orphaned multipart uploads are aborted if the Tokio task is cancelled mid-upload (prevents storage leaks).split_off+mem::replace(nodrain().collect()double-allocation).resolve_credentialaccepts a lookup closure, eliminating unsoundstd::env::set_varin tests.object_storefeature flag (not indefault), following the pattern ofsvt_av1,dav1d, etc.Files added/modified
crates/nodes/src/core/object_store_write.rscrates/nodes/src/core/mod.rscrates/nodes/Cargo.tomlopendaldependency +object_storefeaturesamples/pipelines/oneshot/tts_to_s3.ymlsamples/pipelines/oneshot/transcode_to_s3.ymlsamples/pipelines/dynamic/moq_s3_archive.ymldocker-compose.rustfs.ymlReview & Testing Checklist for Human
AbortOnDropguard correctly aborts multipart uploads on task cancellation (spin up RustFS, start a long upload, kill the pipeline mid-stream, check for orphaned parts)docker compose -f docker-compose.rustfs.yml up, create bucket, run a sample pipeline withobject_storefeature enabled)split_off+mem::replacechunk extraction for correctness at boundary conditions (buffer exactly == chunk_size, buffer >> chunk_size)Notes
moq_s3_archive.ymlsample uses a static key (recordings/live-session.ogg) — every session overwrites the previous recording. A comment documents this limitation and suggests template variable support as a future enhancement.range(min = 1)+ factory check redundancy) is intentional defense-in-depth — no change needed.default-features = false, features = ["services-s3"]to keep the dependency footprint minimal.Link to Devin session: https://staging.itsdev.in/sessions/a27079cc9abc4b1a9cb26b6045442ef1
Requested by: @streamer45